# Weakly Supervised Labeling of Documents (PART 1)
## Objective

This notebook demonstrates how to leverage structured outputs from OpenAI's GPT-4o-mini model for data labeling of climate related research papers. The task involves analyzing academic texts to identify and classify mentions of datasets while ensuring consistency in context across pages.


## Workflow

**PDF Text Extraction:**
   * Use PyMuPDF to extract pages from PDF documents.
   * Prefiltering document pages using an HF-trained model.

**Weakly Supervised Data Labeling**
   * Use the GPT-4o-mini model with a customized prompt for structured data extraction.

**LLM as a Judge (Validation & Error Correction):**
   * Use an LLM to validate extracted dataset mentions.
   * Correct or remove errors in dataset identification.
   * Filter only **valid dataset mentions (`valid: true`)**, discarding invalid entries.

**Autonomous Reasoning Agent**
   * Use a reasoning pipeline to validate the LLM as a judge output
**Next Steps**
   * Scale this into a batch processing of multiple files / directory of research papers.


This workflow demonstrates a weakly supervised approach to labeling documents, specifically focusing on identifying and classifying dataset mentions in research papers. 

Install Required Packages

In [21]:
%%capture
!pip install pymupdf openai nltk scikit-learn python-dotenv networkx transformers

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Helper Functions

In [2]:
import pymupdf
import requests
import tempfile


def load_doc(fname_or_url: str, n_pages: int = 1) -> list:
    """
    Loads a PDF document from a file or URL and extracts content from it.

    Args:
        fname_or_url (str): The path to the PDF file or a URL where the file can be downloaded.
        n_pages (int, optional): The number of pages to extract. Defaults to 1.

    Returns:
        list: A list of dictionaries containing the extracted text and page indices.

    Raises:
        ValueError: If the number of pages is not greater than 0.
        Exception: If the PDF file fails to download from the specified URL or if there's an issue loading the document.
    """

    # Validate that the number of pages is greater than 0
    assert n_pages > 0, "The number of pages must be greater than 0."

    def _load_doc(fname: str) -> list:
        """
        Creates content from two successive pages.

        Args:
            fname (str): The path to the PDF file.

        Returns:
            list: A list of dictionaries containing the extracted text and page indices.
        """

        # Initialize an empty list to store the contents
        contents = []

        # Open the PDF document
        doc = pymupdf.open(fname)

        # Iterate over the pages, skipping the last n_pages - 1 pages
        for page_idx in range(len(doc) - (n_pages - 1)):
            # Extract text from each of the next n_pages pages and store it as a dictionary
            contents.append(
                dict(
                    text="\n\n".join(
                        [doc[page_idx + i].get_text() for i in range(n_pages)]
                    ),
                    pages=[page_idx + i for i in range(n_pages)],
                )
            )

        # Validate that all pages were loaded successfully
        assert len(doc) - (n_pages - 1) == len(contents), "Failed to load all pages."

        return contents

    # Check if the file or URL starts with 'http:' or 'https:'
    if fname_or_url.startswith(("http:", "https:")):
        # Download the PDF file from the specified URL
        with tempfile.NamedTemporaryFile(suffix=".pdf") as temp_pdf:
            response = requests.get(fname_or_url, stream=True)
            if response.status_code == 200:
                # Write the downloaded data to the temporary file
                for chunk in response.iter_content(chunk_size=8192):
                    temp_pdf.write(chunk)
                # Seek back to the beginning of the file and return the loaded document
                temp_pdf.seek(0)
                return _load_doc(temp_pdf.name)
            else:
                # Raise an exception if there's an issue with the download or loading the document
                raise Exception(
                    f"Failed to download PDF, status code: {response.status_code}"
                )

    else:
        # If it's not a URL, simply load the document from the specified file path
        return _load_doc(fname_or_url)

In [3]:
# using pymupdf load the document via url, you can also use filename here or loop over multiple files (list of urls or filenames)
url_path = "https://documents1.worldbank.org/curated/en/776741468181503442/pdf/The-local-socioeconomic-effects-of-gold-mining-evidence-from-Ghana.pdf"
loaded_doc = load_doc(url_path, n_pages=1)

In [4]:
# inspect the loaded document
len(loaded_doc)  # number of pages

47

In [5]:
print(loaded_doc[3]["text"][:500])

2 
1 Introduction 
The mining sector in Africa is growing rapidly and is the main recipient of foreign direct 
investment (World Bank 2011). The welfare effects of this sector are not well understood, 
although a literature has recently developed around this question. The main contribution of this 
paper is to shed light on the welfare effects of gold mining in a detailed, in-depth country study 
of Ghana, a country with a long tradition of gold mining and a recent, large expansion in capital-
i


Load utility functions

In [6]:
import json
import os
from nltk.tokenize import sent_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity


def find_best_matching_span(text, snippet, window: int = 1):
    sents = sent_tokenize(text)
    tfidf = TfidfVectorizer(ngram_range=(1, 3))
    mi_vec = tfidf.fit_transform([snippet])
    sents_vec = tfidf.transform(sents)

    mx_idx = cosine_similarity(mi_vec, sents_vec).flatten().argmax()
    span_sents = sents[max(mx_idx - window, 0) : min(mx_idx + window + 1, len(sents))]

    return {
        "match_idx": mx_idx,
        "match_sent": sents[mx_idx],
        "match_span_sents": span_sents,
        "match_span": " ".join(span_sents),
    }


def find_empirical_span(
    text: str, sentences: list, best_match_idx: int, window: int = 1
):
    # Define the start and end indices to include adjacent sentences for context
    start_idx = text.index(sentences[max(best_match_idx - window, 0)])
    last_sent = sentences[min(best_match_idx + window, len(sentences) - 1)]
    # NOTE: This will fail if the last_sent also occurred in an earlier part of the text.
    # SOLUTION: Start the search for last_sent from the start_idx
    end_idx = start_idx + text[start_idx:].index(last_sent) + len(last_sent)

    # Extract the final span
    context_span = text[start_idx:end_idx]

    return {
        "empirical_span": context_span,  # Extracted span
        "start_idx": start_idx,
        "end_idx": end_idx,
    }


def get_empirical_mentioned_in(
    text, mentioned_in, window: int = 1, with_match_output: bool = False
):
    """
    Extract the most relevant span of text from the original document (`text`)
    that matches the `mentioned_in` field. Returns the span, label, start, and end indices.
    """
    # Tokenize the text into sentences
    sentences = sent_tokenize(text)
    match_output = find_best_matching_span(text, mentioned_in, window=window)
    best_match_idx = match_output["match_idx"]

    output = find_empirical_span(text, sentences, best_match_idx, window=window)
    output["empirical_mentioned_in"] = output.pop("empirical_span")

    output = {
        "label": "mentioned_in",  # Label as "mentioned_in"
        **output,
    }

    if with_match_output:
        output.update(match_output)

    return output

In [7]:
def chunk_text(text, tokenizer, max_length=500):
    """
    Split the text into chunks of max_length tokens, ensuring no chunk exceeds the model's token limit,
    and includes special tokens properly.

    Args:
        text (str): The input text to be chunked.
        tokenizer: The tokenizer to use for encoding and decoding.
        max_length (int): The maximum length of tokens allowed in each chunk, including special tokens.

    Returns:
        list: A list of text chunks as strings.
    """
    # Reserve space for special tokens (e.g., [CLS], [SEP])
    special_tokens_count = 2  # Adjust based on the tokenizer's special token usage
    chunk_size = max_length - special_tokens_count

    # Tokenize the text into token IDs without truncation
    tokens = tokenizer.encode(text, add_special_tokens=False)

    # Split the tokens into chunks
    chunks = []
    for i in range(0, len(tokens), chunk_size):
        token_chunk = tokens[i : i + chunk_size]
        # Add special tokens to the chunk
        token_chunk_with_specials = (
            [tokenizer.cls_token_id] + token_chunk + [tokenizer.sep_token_id]
        )
        # Decode the chunk back to text
        chunk_text = tokenizer.decode(
            token_chunk_with_specials, skip_special_tokens=False
        )
        chunks.append(chunk_text)

    return chunks

In [8]:
def save_text_per_document(text, text_output_path, page_idx):
    """
    Save cleaned text for each page to a single JSON file, appending page data.

    Parameters:
        text (str): The cleaned text for the current page.
        text_output_path (str): The path to the text JSON file.
        page_idx (int): The current page index.

    Returns:
        None
    """
    # Load existing text data or create a new structure
    if os.path.exists(text_output_path):
        with open(text_output_path, "r", encoding="utf-8") as existing_file:
            text_data = json.load(existing_file)
    else:
        text_data = {
            "source": os.path.splitext(os.path.basename(text_output_path))[0],
            "pages": {},
        }

    # Add text for the current page
    text_data["pages"][str(page_idx + 1)] = text

    # Save the updated text data
    os.makedirs(os.path.dirname(text_output_path), exist_ok=True)
    with open(text_output_path, "w", encoding="utf-8") as text_file:
        json.dump(text_data, text_file, indent=4)

In [9]:
# load helper functions

from copy import deepcopy
import networkx as nx


def consolidate_dataset(raw_text: str, data: dict):
    text = raw_text
    page_data = {"dataset_used": data.get("dataset_used", False), "data_mentions": []}

    G = nx.Graph()
    sents = sent_tokenize(text)
    _datasets = []

    for ds in data.get("dataset", []):
        mentioned_in = ds.pop("mentioned_in") or ""

        try:
            mi = find_best_matching_span(mentioned_in, ds["raw_name"], window=0)
            mi = mi["match_span"]
            match_output = find_best_matching_span(text, mi, window=1)
        except ValueError:
            # Likely that the `mentioned_in` is not found in the text or not correct.
            # We try expanding the search to the entire text.
            match_output = find_best_matching_span(text, ds["raw_name"], window=1)

        ds["sent_spans"] = match_output["match_span_sents"]
        sents_idx = sorted([sents.index(s) for s in ds["sent_spans"]])
        ds["sent"] = match_output["match_sent"]
        ds["sent_idx"] = sents_idx

        G.add_edges_from(zip(sents_idx[:-1], sents_idx[1:]))
        _datasets.append(ds)

    _datasets = sorted(_datasets, key=lambda x: x["sent_idx"][0])

    # The connected components in the graphs form the `mentioned_in`s.
    mentioned_ins = sorted(
        [sorted(x) for x in nx.connected_components(G)], key=lambda x: x[0]
    )
    updated_mentions = []

    for midx in mentioned_ins:
        _mi = {"mentioned_in": " ".join([sents[i] for i in midx]), "datasets": []}

        for ds in _datasets:
            ds = deepcopy(ds)
            if ds["sent_idx"][0] in midx:
                ds.pop("sent_idx")
                ds.pop("sent_spans")
                _mi["datasets"].append(ds)

        updated_mentions.append(_mi)

    page_data["data_mentions"] = updated_mentions

    return page_data


def save_output_per_document(raw_text, data, output_path, page_idx):
    """
    Save output data to a JSON file per document, appending new page data.

    Parameters:
        data (LabelledResponseFormat): The data to save, in the validated format.
        output_path (str): The output path for the document-wide JSON file.
        page_idx (int): The current page index being processed.

    Returns:
        None
    """

    # Restructure and consolidate dataset if possible
    page_data = consolidate_dataset(raw_text, data)

    # Initialize the new page's data structure
    page_data = {"page": page_idx + 1, **page_data}

    # Check if the file already exists
    if os.path.exists(output_path):
        with open(output_path, "r", encoding="utf-8") as existing_file:
            document_data = json.load(existing_file)
    else:
        # Create a new JSON structure
        document_data = {
            "source": os.path.splitext(os.path.basename(output_path))[0],
            "pages": [],
        }

    # Append the new page data
    document_data["pages"].append(page_data)

    # Save the updated document data back to the file
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    with open(output_path, "w", encoding="utf-8") as output_file:
        json.dump(document_data, output_file, indent=4)

In [10]:
import re


def clean_extracted_text(text):
    """
    Cleans text extracted from PDFs using PyMuPDF.
    - Reduces unnecessary whitespace and artifacts while preserving meaningful structure.
    - Prevents unintentional removal of spaces or concatenation of words.
    """

    # Replace non-breaking spaces (\xa0) with regular spaces
    text = text.replace("\xa0", " ")

    # Remove control characters (ASCII 0-31) except line breaks
    text = re.sub(r"[\x00-\x08\x0B-\x1F]", "", text)

    # Collapse excessive newlines (more than 2) but preserve single newlines
    text = re.sub(r"\n{3,}", "\n\n", text)

    # Collapse multiple spaces but preserve single spaces between words
    text = re.sub(r"[ \t]{2,}", " ", text)

    # Preserve dashes at line breaks (e.g., "address-\nclimate" to "address-climate")
    text = re.sub(r"([a-zA-Z])-?\n([a-zA-Z])", r"\1-\2", text)

    # Trim leading/trailing spaces and newlines
    text = text.strip()

    return text

In [11]:
from typing import Callable


def should_process_page(text: str, classifier: Callable, tokenizer) -> bool:
    """Determine whether a page should be processed."""

    chunks = chunk_text(text, tokenizer, max_length=500)
    return any(classifier(chunk)[0]["label"] != "NO_DATA" for chunk in chunks)

## Weakly Supervised Labeling

In [12]:
from openai import OpenAI

# Load environment variables from .env file
# load_dotenv()

API_KEY = "YOUR_API_KEY"
MODEL = "gpt-4o-mini"
client = OpenAI(api_key=API_KEY)  # initialize the client

## Create a the prompt and Pydantic Model for [Structured Outputs](https://platform.openai.com/docs/guides/structured-outputs)

In [13]:
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum


# Define Enums for categorical fields
class Context(str, Enum):
    background = "background"
    supporting = "supporting"
    primary = "primary"


class Specificity(str, Enum):
    properly_named = "properly_named"
    descriptive_but_unnamed = "descriptive_but_unnamed"
    vague_generic = "vague_generic"


class Relevance(str, Enum):
    directly_relevant = "directly_relevant"
    indirectly_relevant = "indirectly_relevant"
    not_relevant = "not_relevant"


class DatasetEntry(BaseModel):
    raw_name: Optional[str] = Field(
        ..., description="The exact dataset name as it appears in the text."
    )
    harmonized_name: Optional[str] = Field(
        None, description="The standardized or full name of the dataset."
    )
    acronym: Optional[str] = Field(
        None, description="The short name or acronym associated with the dataset."
    )
    context: Context
    specificity: Specificity
    relevance: Relevance
    mentioned_in: Optional[str] = Field(
        None, description="The exact text excerpt where the dataset is mentioned."
    )
    producer: Optional[str] = Field(
        None, description="The organization responsible for producing the dataset."
    )
    data_type: Optional[str] = Field(
        None, description="The type of data represented by the dataset."
    )


class LabelledResponseFormat(BaseModel):
    dataset: List[DatasetEntry] = Field(
        ..., description="A list of datasets mentioned in the paper."
    )
    dataset_used: bool = Field(
        ..., description="A boolean indicating if a dataset is used in the paper."
    )

In [14]:
DATA_USE_TASK_PROMPT = """You are an expert in extracting and categorizing dataset mentions from research papers and policy documents. Your task is to **identify and extract all valid dataset mentions**, ensuring they are correctly classified based on naming specificity, context, and relevance.

### **What Qualifies as a Dataset?**
A dataset is a structured collection of data used for empirical research, analysis, or policy-making. Examples include:
- **Surveys & Census Data** (e.g., LSMS, DHS, national census records)
- **Indicators & Indexes** (e.g., HDI, GFSI, WDI, ND-GAIN, EPI)
- **Geospatial & Environmental Data** (e.g., OpenStreetMap, Sentinel-2 imagery)
- **Economic & Trade Data** (e.g., UN Comtrade, Balance of Payments Statistics)
- **Health & Public Safety Data** (e.g., epidemiological surveillance, crime reports)
- **Time-Series & Energy Data** (e.g., climate projections, electricity demand records)
- **Transport & Mobility Data** (e.g., road accident statistics, smart city traffic flow)
- **Other emerging dataset types** as identified in the text.

**Important:**  
If the dataset does not fit into the examples above, infer the **most appropriate category** from the context and **create a new `"data_type"` if necessary**.

### **What Should NOT Be Extracted?**
Do **not** extract mentions that do not clearly refer to a dataset, including, but not limited to:
1. **Organizations & Institutions** (e.g., WHO, IMF, UNDP, "World Bank data" unless it explicitly refers to a dataset)
2. **Reports & Policy Documents** (e.g., "Fiscal Monitor by the IMF", "IEA Energy Report"; only extract if the dataset itself is referenced)
3. **Generic Mentions of Data** (e.g., "various sources", "survey results from multiple institutions")
4. **Economic Models & Policy Frameworks** (e.g., "GDP growth projections", "macroeconomic forecasts")
5. **Legislation & Agreements** (e.g., "Paris Agreement", "General Data Protection Regulation")

### **Rules for Extraction**
1. **Extract All Structured Data Mentions**
   - If the dataset is explicitly named (e.g., "Global Fishing Watch"), label it as `"properly_named"`.
   - If the dataset is described but not explicitly named (e.g., "electricity usage data from Albania"), label it as `"descriptive_but_unnamed"`.
   - If the dataset mention is too generic (e.g., "electricity usage data"), label it as `"vague_generic"`.

2. **Ensure `"data_type"` Is Always Assigned**
   - **Use an existing category if applicable.**
   - **If no suitable category exists, create a new `"data_type"` based on context.**

3. **Classify `"context"` Correctly**
   - `"primary"`: The dataset is used for direct analysis in the document.
   - `"supporting"`: The dataset is referenced to validate or compare findings.
   - `"background"`: The dataset is mentioned as general context or prior research.

   **Examples:**
   - `"The LSMS-ISA data is analyzed to assess the impact of agricultural practices on productivity."` → `"primary"`
   - `"Our results align with previous studies that used LSMS-ISA."` → `"supporting"`
   - `"LSMS-ISA is widely recognized as a reliable data source for agricultural research."` → `"background"`

4. **Capture Full Sentence Context**
   - The `"mentioned_in"` field must always include the **full sentence** where the dataset is referenced.
   - If a dataset is mistakenly extracted from an unrelated sentence, correct it.

### **Extraction Schema**
Each extracted dataset should have the following fields:
- `raw_name`: Exact dataset name from the text (**no paraphrasing**).
- `harmonized_name`: If properly named, use directly; if referenced in multiple ways, standardize using the most precise form in the text, otherwise, set this to None.
- `acronym`: Extract if explicitly mentioned.
- `mentioned_in`: **Full sentence** where the dataset appears (**no paraphrasing**).
- `context`: **primary / supporting / background**
- `specificity`: **properly_named / descriptive_but_unnamed / vague_generic**
- `relevance`: **directly_relevant / indirectly_relevant / not_relevant**
- `producer`: **Extract only if explicitly mentioned; otherwise, set to `None`.**
- `data_type`: **Assign based on existing categories, but create new ones if necessary.**

### **Handling New or Unlisted Data Types**
- If a dataset does not fit into existing categories, **infer an appropriate name** for its `"data_type"` based on context.
- Use a **general but informative label** for new data types (e.g., `"Climate Risk Data"`, `"Social Media Analytics"`).

### **Important: Do NOT Skip Unnamed Datasets**
If a dataset is described but lacks a proper name, extract it under `"descriptive_but_unnamed"` or `"vague_generic"`, which ever is appropriate.
If `"producer"` is not mentioned, set it to `None` rather than inferring."""

In [15]:
from typing import List, Optional
from transformers import AutoTokenizer
from transformers import pipeline
from tqdm.auto import tqdm


def process_document_extraction(fname_or_url: str):
    # set up the output directory
    loaded_doc = load_doc(fname_or_url, n_pages=1)
    base_name = os.path.splitext(os.path.basename(fname_or_url))[0]

    # we will store the results in a dictionary
    extraction_path = f"./output/extracted_data/{base_name}.json"
    text_output_path = f"./output/text/{base_name}.json"
    raw_text_output_path = f"./output/raw_text/{base_name}.json"

    # we will use a trained ModernBert model to filter the pages that likely contains data

    data_model_id = "ai4data-use/bert-base-uncased-data-use"
    tokenizer = AutoTokenizer.from_pretrained(data_model_id)

    # load model from huggingface.co/models using our repository id
    classifier = pipeline(
        "text-classification", model=data_model_id, tokenizer=tokenizer
    )
    for page_idx, page in tqdm(
        enumerate(loaded_doc), desc="Processing pages", total=len(loaded_doc)
    ):
        raw_text = page["text"]
        text = clean_extracted_text(raw_text)

        # Save raw text for the page
        save_text_per_document(raw_text, raw_text_output_path, page_idx)

        # Save cleaned text for the page
        save_text_per_document(text, text_output_path, page_idx)
        # Check if the page should be processed
        # If the page contains data and the model returns with data, we will process it
        if not raw_text or not should_process_page(text, classifier, tokenizer):
            # print(f"skipping {page_idx}, contains no data")
            continue

        # Process the page
        completion = client.beta.chat.completions.parse(
            model=MODEL,
            temperature=0.2,  # you can tweak this if you want
            messages=[
                {"role": "system", "content": DATA_USE_TASK_PROMPT},
                {"role": "user", "content": text},
            ],
            response_format=LabelledResponseFormat,
        )

        parsed_data = completion.choices[0].message.parsed

        # Save the extraction output
        save_output_per_document(
            raw_text, parsed_data.model_dump(), extraction_path, page_idx
        )

In [22]:
# uncomment to run
# process_document_extraction(url_path)

In [23]:
# inspect the output of the extraction

with open(
    "./output/extracted_data/The-local-socioeconomic-effects-of-gold-mining-evidence-from-Ghana.json",
    "r",
) as f:
    extracted_data = json.load(f)
    print(json.dumps(extracted_data, ensure_ascii=False, indent=2))

{
  "source": "The-local-socioeconomic-effects-of-gold-mining-evidence-from-Ghana",
  "pages": [
    {
      "page": 4,
      "dataset_used": true,
      "data_mentions": [
        {
          "mentioned_in": "We also allow for spillovers across \ndistricts, in a district-level analysis. We use two complementary geocoded household data sets \nto analyze outcomes in Ghana: the Demographic and Health Survey (DHS) and the Ghana \nLiving Standard Survey (GLSS), which provide information on a wide range of welfare \noutcomes. The paper contributes to the growing literature on the local effects of mining.",
          "datasets": [
            {
              "raw_name": "Demographic and Health Survey (DHS)",
              "harmonized_name": "Demographic and Health Survey (DHS)",
              "acronym": "DHS",
              "context": "primary",
              "specificity": "properly_named",
              "relevance": "directly_relevant",
              "producer": null,
              "data_t

In this step, we perform weakly supervised labeling of the document to extract dataset mentions. The extracted data includes structured information about datasets, such as their names, context, specificity, relevance, and other attributes. This process uses the GPT-4o-mini model with a customized prompt to ensure accurate and consistent extraction.

### Next Step

The output from this step will be processed by the critic model (LLM-as-a-Judge) to validate and refine the extracted dataset mentions, ensuring their quality and correctness.